package com.atguigu.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @className: BaseLogApp
 * @author: LinCong
 * @description:    流量域未经加工的事务事实表
 * @date: 2023/1/17 17:59
 * @version: 1.0
 */

//日志服务器（.log）-> flume -> kafka -> flink(BaseLogApp) -> kafka
public class BaseLogApp {
    public static void main(String[] args) throws Exception {
//        1、获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        ////        1.1、开启checkpoint
//        env.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);
//        //设置checkpoint的超时时间,如果 Checkpoint在 10分钟内尚未完成说明该次Checkpoint失败,则丢弃。(默认10分钟)
//        env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
//        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(120000L);
//        //固定延迟重启   （最多重启次数，每次重启的时间间隔）
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
////        1.2、设置状态后端
//        env.setStateBackend(new HashMapStateBackend());
//        System.setProperty("HADOOP_USER_NAME", "kevin");
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop3cluster/211126/ck");

//        2、消费kafka topic_log 主题的数据创建流
        String topic = "topic_log";
        String groupId = "base_log_app";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));

//        3、过滤非JSON格式的数据
        OutputTag<String> dirtyTag = new OutputTag<String>("Dirty"){};
        SingleOutputStreamOperator<JSONObject> jsonObjectDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {

                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception e) {
                    ctx.output(dirtyTag, value);
                }
            }
        });
//        获取测输出流并打印
        DataStream<String> dirtyDS = jsonObjectDS.getSideOutput(dirtyTag);
        dirtyDS.print("Dirty>>>>>>");

//        4、按照mid分组
        KeyedStream<JSONObject, String> keyedStream = jsonObjectDS.keyBy(json -> json.getJSONObject("common").getString("mid"));

//        5、使用状态编程做新老访客标记校验
        SingleOutputStreamOperator<JSONObject> jsonObjectWithNewFlagDS = keyedStream.map(new RichMapFunction<JSONObject, JSONObject>() {
            private ValueState<String> lasteVisitState;

            @Override
            public void open(Configuration parameters) throws Exception {
                lasteVisitState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-visit", String.class));
            }

            @Override
            public JSONObject map(JSONObject value) throws Exception {
                String is_new = value.getJSONObject("common").getString("is_new");
                Long ts = value.getLong("ts");
                String curDate = DateFormatUtil.toDate(ts);
                String lastDate = lasteVisitState.value();
//                is_new:   0代表老用户 1代表新用户
                if ("1".equals(is_new)) {
                    if (lastDate == null) {
                        lasteVisitState.update(curDate);
                    } else if (!lastDate.equals(curDate)) {
                        value.getJSONObject("common").put("is_new", "0");
                    }
                } else if (lastDate == null) {
                    lasteVisitState.update(DateFormatUtil.toDate(ts - 1000 * 60 * 60 * 24L));
                }

                return value;
            }
        });

//        6、使用测输出流进行分流处理    页面日志放到主流 启动、曝光、动作、错误放到测输出流
        OutputTag<String> startTag = new OutputTag<String>("start"){};
        OutputTag<String> displayTag = new OutputTag<String>("display"){};
        OutputTag<String> actionTag = new OutputTag<String>("action"){};
        OutputTag<String> errorTag = new OutputTag<String>("error"){};

        SingleOutputStreamOperator<String> pageDS = jsonObjectWithNewFlagDS.process(new ProcessFunction<JSONObject, String>() {
            @Override
            public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
//                  尝试获取错误信息
                String err = value.getString("err");
                if (err != null) {
//                    将数据写入到错误测输出流
                    ctx.output(errorTag, value.toJSONString());
                }
                //移除错误信息
                value.remove("err");

//                尝试获取启动信息
                String start = value.getString("start");
                if (start != null) {
//                    将数据写入到启动测输出流
                    ctx.output(startTag, value.toJSONString());
                } else {
//                    获取公共信息、页面id、时间戳
                    String commmon = value.getString("commmon");
                    String pageId = value.getJSONObject("page").getString("page_id");
                    Long ts = value.getLong("ts");

//                    尝试获取曝光数据
                    JSONArray displays = value.getJSONArray("displays");
                    if (displays != null && displays.size() > 0) {
//                        遍历曝光数据写入到曝光测输出流
                        for (int i = 0; i < displays.size(); i++) {
                            JSONObject display = displays.getJSONObject(i);
                            display.put("common", commmon);
                            display.put("page_id", pageId);
                            display.put("ts", ts);
                            ctx.output(displayTag, display.toJSONString());
                        }
                    }

                    //                    尝试获取动作数据
                    JSONArray actions = value.getJSONArray("actions");
                    if (actions != null && actions.size() > 0) {
//                        遍历动作数据写入到曝光测输出流
                        for (int i = 0; i < actions.size(); i++) {
                            JSONObject action = actions.getJSONObject(i);
                            action.put("common", commmon);
                            action.put("page_id", pageId);
                            ctx.output(actionTag, action.toJSONString());
                        }
                    }

//                    移除曝光和动作数据，写入到页面输出流
                    value.remove("displays");
                    value.remove("actions");
                    out.collect(value.toJSONString());
                }
            }
        });

//        7、提取各输出流数据
        DataStream<String> startDS = pageDS.getSideOutput(startTag);
        DataStream<String> displayDS = pageDS.getSideOutput(displayTag);
        DataStream<String> actionDS = pageDS.getSideOutput(actionTag);
        DataStream<String> errorDS = pageDS.getSideOutput(errorTag);

//        8、将数据打印并写入对应的主题
        pageDS.print("Page>>>>>>");
        startDS.print("Start>>>>>>");
        displayDS.print("Display>>>>>>");
        actionDS.print("Action>>>>>>");
        errorDS.print("Error>>>>>>");

//      定义不同日志输出到 Kafka 的主题名称
        String page_topic = "dwd_traffic_page_log";
        String start_topic = "dwd_traffic_start_log";
        String display_topic = "dwd_traffic_display_log";
        String action_topic = "dwd_traffic_action_log";
        String error_topic = "dwd_traffic_error_log";

        pageDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(page_topic));
        startDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(start_topic));
        displayDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(display_topic));
        actionDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(action_topic));
        errorDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(error_topic));

//        9、启动任务
        env.execute("BaseLogApp");
    }
}
